Skip to content

Conversation

@asheshvidyut
Copy link

@asheshvidyut asheshvidyut commented Nov 7, 2025

Motivation and Context

Add support for Pluggable Transport Abstractions in MCP Python SDK.

Add abstractions to support Pluggable Transport.

This PR majorly adds two abstract classes and APIs that every transport must implement. The abstract classes are

  • src/mcp/client/transport_session.py -> ClientTransportSession
  • src/mcp/server/transport_session.py -> ServerTransportSession

Both the above classes have minimal APIs that every transport must implement in order to achieve the features defined in MCP Specification

Additionally existing transport classes which are based on JSONRPC inherits from these two new classes.

src/mcp/client/session.py -> ClientSession -> inherits from src/mcp/client/transport_session.py -> ClientTransportSession
src/mcp/server/session.py -> ServerSession -> inherits from src/mcp/server/transport_session.py -> ServerTransportSession

Type Hints Fixes

Since ClientSession and ServerSession has a higher level abstraction so this PR also updates the type hints to the parent classes. Precisely - places where we use ClientSession are updated to use ClientTransportSession and similarly ServerSession type hints are updated to use ServerTransportSession.

How Has This Been Tested?

Tested using pyright and uv run pytest. Changes are also validated using CI runs.

Breaking Changes

No.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

In future if we want to add more transports, those could implement abstract classes introduced in the PR which are - ClientTransportSesssion and ServerTransportSession.

@asheshvidyut asheshvidyut marked this pull request as ready for review November 7, 2025 09:48
@Kludex
Copy link
Member

Kludex commented Jan 23, 2026

The session objects aren't transport specific, what problem are you trying to solve?


Why does this PR has so many "👍" ?

@asheshvidyut
Copy link
Author

asheshvidyut commented Jan 23, 2026

The session objects aren't transport specific, what problem are you trying to solve?

Why does this PR has so many "👍" ?

Hey @Kludex , Thanks for your reply.

Currently BaseSession, ServerSession and ClientSession are dependent on read write streams. If we want to add a gRPC Transport in a pluggable fashion, problem now is that we don't know what are the minimum APIs on Server and Client side which we need to implement to be fully compatible with MCP Specification.

Since gRPC Transport does not require read write streams, we have created abstract classes in this PR following interface segregation principle so that the abstract classes could be inherited by the current transport in this sdk as well as any future transport like gRPC.

This is part of modelcontextprotocol/modelcontextprotocol#1352 initiative.

Please let me know if I need to explain further. Thanks.

@Kludex
Copy link
Member

Kludex commented Jan 23, 2026

[...] dependent on read write streams [...]

Yeah, I don't think there's any problem in that. Again, the session classes do not depend on any transport.

problem now is that we don't know what are the minimum APIs on Server and Client side which we need to implement to be fully compatible with MCP Specification.

The minimum is having the streams, check the stdio.py and websockets.py in this repository.

Since gRPC Transport does not require read write streams

Are you talking about what is implemented in: #1936 ? Streams are the same as queue, you do require queues in that PR, replace the asyncio.Queue by the streams.

@krickert
Copy link

asheshvidyut#8

I've added a PR to merge in here - it's a full implementation of MCP using gRPC.

I've been using gRPC/thrift/avro for awhile - and this PR implements full backward compatiibility with the proto I created. I'm totally open to changes and hope I can contribute to the efforts.

Most important - it implements true streaming calls. I've added three documents in the proto directory that document the following:

  1. proto/README.md - Overall review of the full implementation.
  2. proto/README-MCP-TUNNELING-PROPOSAL.md - a full description and architecture of the tunneling and how it was implemented
  3. grpc-streaming-use-cases.md - I wanted to highlight the use cases I would use this for.
  4. google-blog-post-grpc_custom_transport_for_MCP.md - can delete this, used it as a reference to the CTA that was posted on the google blog

@Kludex
Copy link
Member

Kludex commented Jan 25, 2026

If you want to make a point, please make it here. I don't think it's reasonable to tell me to read documents somewhere else.


I've been using gRPC/thrift/avro for awhile - and this PR implements full backward compatiibility with the proto I created. I'm totally open to changes and hope I can contribute to the efforts.

Use anyio streams instead of asyncio.Queue - that's the only reason the interface we have right now is not compatible with the one you created.


I'll be closing this, since a new interface or abstraction is not needed.

@Kludex Kludex closed this Jan 25, 2026
@krickert
Copy link

krickert commented Jan 25, 2026

Fair point on the documentation - I will put them as a separate reply. I didn't want to inundate you - apologies for that as I'd love to have a discussion about the tunneling.

I've refactored to use anyio.create_memory_object_stream instead of asyncio.Queue to align with SDK patterns. The queues were internal coordination for streaming responses - the transport itself uses native gRPC stub calls.

I see Google Cloud just pushed their mcp-grpc-transport-proto today. This validates the typed RPC approach - they're not wrapping JSON-RPC in protobuf, they have typed RPCs for each MCP operation:

service Mcp {
  rpc ListResources(ListResourcesRequest) returns (ListResourcesResponse);
  rpc ListTools(ListToolsRequest) returns (ListToolsResponse);
  rpc CallTool(CallToolRequest) returns (stream CallToolResponse);
  // ...
}

This aligns with what I implemented. However, looking at their proto, I think there's room for improvement:

Google's proto is mostly unary RPCs. Only CallTool returns a stream. This means that we can't watch for resource changes (must poll ListResources), there's no parallel tool execution (one tool at a time), it lacks chunked reading for large resources, and there's no bidirectional session multiplexing

They use a dependent_requests/dependent_responses pattern for server-to-client communication, which requires the client to retry requests to receive server-initiated data. This is polling, not streaming.

Here's what I propose:

I've drafted an extension that adds true streaming while staying compatible with Google's base proto:

service McpStreaming {
  // Bidirectional session for multiplexed operations
  rpc Session(stream SessionRequest) returns (stream SessionResponse);

  // Push notifications for resource changes
  rpc WatchResources(WatchResourcesRequest) returns (stream WatchResourcesResponse);

  // Stream large resources in chunks
  rpc ReadResourceChunked(ReadResourceChunkedRequest) returns (stream ResourceChunk);

  // Parallel tool execution
  rpc StreamToolCalls(stream StreamToolCallsRequest) returns (stream StreamToolCallsResponse);
}

Another point I'd like to discuss:

Schema registry integration

There's also an opportunity here for dynamic schema management. MCP tools declare JSON schemas for inputs, but with gRPC we could integrate with schema registries (like Confluent Schema Registry, Apicurio, or Amazon Glue).

This will allow us to:

  • Fetch protobuf descriptors at runtime without compile-time codegen
  • Handle schema evolution without redeploying clients
  • Allow gateways to validate/transform without compiled schemas
  • Support multi-tenant systems with different tool schemas per tenant

I've forked Google's proto repo and drafted a proposal: https://github.com/ai-pipestream/mcp-grpc-transport-proto/tree/streaming-extensions (i'll push more changes in a moment)

Questions I'd like feedback on:

  1. Is there any resistance to introducing true streaming calls to the protocol definition for MCP?
  2. Should streaming extensions live in the same proto or a separate file?
  3. Is schema registry integration something worth pursuing for the SDK? It'll allow for dynamic strongly typed gRPC implementation integration, making it a lot easier for an AI to understand.
  4. Can the grpc defninition follow the standards more (end with v1/v2 etc in the directory, proper naming of classes per spec, etc)? I have converted mine to follow spec, but the first ones proposed are not using that standard - it'll help grpc developers if we do.

Happy to open PRs to either Google's repo or the MCP SDK to continue the discussion with concrete code.

@Kludex
Copy link
Member

Kludex commented Jan 25, 2026

I think you are antecipating yourself a bit here. I have no interest nor opinion in how the specific gRPC transport implementation should look like.

  1. Is there any resistance to introducing true streaming calls to the protocol definition for MCP?

How do you see that happening, and why does the user needs to define the chunks the server will send themselves? I'm asking those questions based on the snippet I saw in your branch:

import asyncio

from mcp import StreamPromptCompletionChunk
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("Streaming Prompt Completion")


@mcp.stream_prompt_completion()
async def stream_prompt_completion(name: str, arguments: dict[str, str] | None):
    query = (arguments or {}).get("q", "")
    tokens = [f"Prompt {name}: ", query, " ...done"]

    for token in tokens[:-1]:
        yield StreamPromptCompletionChunk(token=token)
        await asyncio.sleep(0.05)

    yield StreamPromptCompletionChunk(
        token=tokens[-1],
        isFinal=True,
        finishReason="stop",
    )

I don't think the above is how we want to do it in any level - we have a lower level server and the high level (more user-friendly FastMCP [today renamed on main to MCPServer]).

  1. Should streaming extensions live in the same proto or a separate file?

I don't have an opinion nor I care about gRPC, but it should not live in this repository given that it's an extension.


Going back to the original intent of this PR: the gRPC transport implemented can't be compliant given that MCP is tightly coupled with JSONRPC, which are reflected by the schemas in https://github.com/modelcontextprotocol/modelcontextprotocol/blob/main/schema/2025-11-25/schema.ts, and the documentation. So... Now I understand why the ClientTransportSession and ServerTransportSession are needed here: it's because the BaseSession parses/validates/builds JSONRPC data format.

If we want to have an abstraction that will make sense for transport implementers, the definition of the MCP types itself needs to change.

@krickert
Copy link

Totally understand - I should've told you my motivation and apologies for trying it here.

I initially saw the gRPC CTA:
https://cloud.google.com/blog/products/networking/grpc-as-a-native-transport-for-mcp

I submitted an email and it was suggested to join the discussion here for the gRPC work that was being done - my mistake though, I didn't realize this was the wrong place for this.

To answer your question about the stream_prompt_completion example - you're totally right. I presented it as complete, which sorta meant a proposed API. My main goal is to push for a true streaming client and your critique is spot-on.

I meant to show what native gRPC streaming could enable, not a thought-out UX. The chunking abstraction definitely shouldn't be exposed to users like that at the high level.

I'll go ahead and focus on the gRPC proto definition with Google's repo. Thanks for the clarification on where this discussion belongs.

@Kludex
Copy link
Member

Kludex commented Jan 25, 2026

I submitted an email and it was suggested to join the discussion here for the gRPC work that was being done - my mistake though, I didn't realize this was the wrong place for this.

Just to be clear, it's the specificities of gRPC that I don't care. I'm happy to discuss ways to create an interface to make it easier for people to work on their own transport implementations. :)

@krickert
Copy link

That's actually what I was exploring with the tunneling proposal - a way to create an interface that works for both streaming transports (like gRPC) and cursor-based transports (like JSON-RPC) without changing either wire protocol.

The Problem

Right now, if I want to implement a gRPC transport, I'm forced into "fake streaming" - the server has to buffer everything into a ListToolsResult, then I yield items one by one. The streaming is cosmetic:

Layer What Happens Issue
Server result = handler() then for item in result: yield Everything loaded into memory first
Client items = []; async for r in stream: items.append(r) Buffers entire stream into list
Interface list_tools() -> ListToolsResult Forces complete results

No memory or latency benefits - just extra steps.

The Idea: StreamingAdapter

What if streaming was the internal abstraction, and cursor-based transports just emulated it?

class StreamingAdapter:
    """Unified streaming interface over any transport."""

    def __init__(self, transport: ClientTransportSession):
        self._transport = transport

    async def stream_list_tools(self) -> AsyncIterator[types.Tool]:
        if isinstance(self._transport, GrpcClientTransport):
            # Native streaming - zero overhead
            async for tool in self._transport._stream_list_tools_native():
                yield tool
        else:
            # Cursor-based - iterate pages internally
            cursor = None
            while True:
                result = await self._transport.list_tools(cursor=cursor)
                for tool in result.tools:
                    yield tool
                cursor = result.nextCursor
                if cursor is None:
                    break

    async def list_tools(self) -> types.ListToolsResult:
        """Backward compatible - collects stream into result."""
        tools = [t async for t in self.stream_list_tools()]
        return types.ListToolsResult(tools=tools, nextCursor=None)

The key points:

  • gRPC transports pass through natively (no overhead)
  • JSON-RPC/SSE transports iterate cursors internally (hidden from app code)
  • list_tools() still works exactly as before - it just buffers the stream
  • New code can use stream_list_tools() if it wants real streaming

What Changes, What Doesn't

Component Knows Streaming? Knows Cursors? Changes?
gRPC Transport Yes (native) No None
JSON-RPC Transport No Yes (native) None
StreamingAdapter Yes Yes New
Old app code No Via list_tools() None
New app code Optional Hidden None

The JSON-RPC wire protocol stays identical. Existing servers and clients don't need updates. Streaming is purely additive.

Backpressure Reality

One thing I didn't want to hide - transports have different backpressure characteristics:

Transport Backpressure Behavior
gRPC Native (HTTP/2 flow control) Server slows if client can't keep up
JSON-RPC None (request-response) Client controls pace via cursor timing

The adapter preserves gRPC's backpressure. For cursor-based transports, "backpressure" is implicit in when the client requests the next page. I think adapters should preserve these realities rather than pretending they don't exist.

Why I like this approach

Simply put, I think this gives us the best of both worlds for transport implementers. This approach:

  1. Doesn't touch JSON-RPC at all
  2. Isolates complexity to one new component (StreamingAdapter)
  3. Keeps backward compatibility - old code just works
  4. Lets streaming transports actually stream

I'm not saying this is the right answer - there's probably many ways to do this. I was inspired by how IPv6 tunneling works - same concept of letting the new protocol work natively where supported while transparently bridging over the old one.

I can write this up in Java and Python. The real win is that MCP implementations could react instantly to requests and maintain 2-way communication between agents without blocking on replies.

@asheshvidyut
Copy link
Author

Thanks for followup @krickert

So... Now I understand why the ClientTransportSession and ServerTransportSession are needed here: it's because the BaseSession parses/validates/builds JSONRPC data format.

@Kludex How should we go about moving ahead?

@Kludex
Copy link
Member

Kludex commented Jan 26, 2026

Thanks for followup @krickert

So... Now I understand why the ClientTransportSession and ServerTransportSession are needed here: it's because the BaseSession parses/validates/builds JSONRPC data format.

@Kludex How should we go about moving ahead?

The types in the spec itself need to be decoupled from JSONRPC.

Once they do, we can look for ways to make the BaseSession to be more "pluggable".

@krickert
Copy link

@Kludex that's exactly the direction I'd like to work on.

I'd suggest starting with the gRPC proto definition in that repo, with the goal that the JSON-RPC interface stays unchanged and works alongside it - not a separate spec maintained in parallel. The current spec can do this, but it needs true streaming added in the design.

The biggest win from the gRPC spec would be introducing true streaming without cursors - this is where we see significant performance (memory and network) gains as well as better integration with AI workloads in data mesh scenarios and client chatbot services. Once we have a solid spec, a tunneling approach will emerge because the gRPC service would be streaming OOTB.

Also, it's best to design the gRPC definition using gRPC specs and avoid a 100% 1:1 mapping of the JSON-RPC API since the streaming aspect already deviates from it. The reason is that gRPC specs prioritize backward compatibility and cross-language design with an emphasis on schemas. Working through the gRPC definition first will surface the right design questions for the transport layer.

I noticed https://github.com/GoogleCloudPlatform/mcp-grpc-transport-proto was pushed - if we address some of the spec issues there (I opened 2 issues and an initial PR that surfaces some of the gRPC concerns), it will make the individual SDK conversations easier to understand how a transport layer can be defined.

@Kludex
Copy link
Member

Kludex commented Jan 26, 2026

I would look for discussions about decoupling the JSONRPC on the MCP types schema in parallel.

@markdroth
Copy link

@Kludex This PR is the result of a discussion with the MCP core maintainers in December, where it was agreed that we would add a pluggable transport abstraction to the MCP SDKs. The current abstraction assumes the use of JSON-RPC and therefore doesn't work for transports like gRPC. Google has committed to provide a custom gRPC transport implementation for MCP, but we need an API to plug that transport implementation into.

Note that this PR is not directly related to the gRPC transport; it's really an effort to make it easier for people to experiment with non-JSON-RPC custom transports, of which gRPC is just one example.

For more context, looping in @kurtisvg (chair of the MCP Transport Workgroup) and @kziemski (chair of the pluggable transport track in the transport WG).

@kurtisvg
Copy link

Hey @Kludex -- I sent you a ping on discord as well. Let me know if it would help to meet to discuss this.

@kziemski
Copy link

@Kludex, as @markdroth mentioned the track is meant to create a way for non-canon custom transports to co-exist outside of the spec but within the larger community without resorting to any drastic measures. the reference to grpc is unfortunate in the comments and isn't meant to be and endorsement for grpc in the spec. We should have more progress on this track this week and I can follow up with you on discord

@krickert
Copy link

I see the MRTR work is formalizing the dependent_requests/dependent_responses sync/polling pattern (Google's proto v1 is an example of this, others can follow). Just want to confirm the distinction: MRTR handles multi-round-trip via polling, while streaming (bidirectional streams, push notifications) would be a v2 consideration. Happy to help the gRPC proto work with this direction (write PoCs, etc) and started that discussion via an issue on the proto definition. Thanks for your attention to this..

@markdroth
Copy link

@krickert Please keep discussion about the gRPC transport in GoogleCloudPlatform/mcp-grpc-transport-proto repo, where you've already filed an issue. This PR is really intended to be completely independent of any specific custom transport implementation, including gRPC. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Request for a new feature that's not currently supported P3 Nice to haves, rare edge cases v2 Ideas, requests and plans for v2 of the SDK which will incorporate major changes and fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants